package com.nx.platform.es.biz.esspider.deliver;

import com.google.gson.Gson;
import com.nx.platform.es.biz.esspider.entity.Message;
import com.nx.platform.es.biz.esspider.exception.DuplicatedProcessorException;
import com.nx.platform.es.biz.esspider.exception.IllegalMessageException;
import com.nx.platform.es.biz.esspider.exception.IllegalProcessorException;
import com.nx.platform.es.biz.esspider.exception.UnregisteredProcessorException;
import com.nx.platform.es.biz.esspider.process.Processor;
import com.nx.platform.es.common.utils.MoreThreadFactorys;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;

/**
 * 派发方法。
 * queue 存储所有的消息
 * 根据
 * @author
 * @since 2017年3月24日
 */
@Slf4j
public class DelivererImpl implements Deliverer {

    private final Map<String, Worker> deliverers;

    public DelivererImpl() {
        this.deliverers = new HashMap<>();
    }

    public void register(@NotNull Processor processor)
            throws RuntimeException {
        String code = processor.getCode();
        if (StringUtils.isEmpty(code)) {
            throw new IllegalProcessorException(processor);
        }
        synchronized (this) {
            if (deliverers.containsKey(code)) {
                throw new DuplicatedProcessorException(processor);
            }
            deliverers.put(code, new Worker(processor));
        }
        log.info("deliver:{}", new Gson().toJson(deliverers));
    }

    @Override
    public void deliver(Message message)
            throws RuntimeException {
        if (message == null || StringUtils.isEmpty(message.getType()) || message.getId() == null) {
            throw new IllegalMessageException(message);
        }
        String type = message.getType();
        Worker deliverer = deliverers.get(type);
        if (deliverer == null) {
            throw new UnregisteredProcessorException(type);
        }
        deliverer.deliver(message);
    }

    private static class Worker {

        private static final Message POISON = new Message();

        private final SynchronousQueue<Message> queue;

        Worker(@NotNull Processor processor) {
            this.queue = new SynchronousQueue<>();
            String name = "Deliverer(" + processor.getCode() + ")";
            ThreadFactory factory = MoreThreadFactorys.newSingleThreadNameableDaemonThreadFactory(name);
            Executors.newSingleThreadExecutor(factory).execute(() -> {
                while (true) {
                    try {
                        Message message = queue.take();
                        if (message == POISON) {
                            break;
                        }
                        try {
                            processor.deliver(message);
                        } catch (Exception e) {
                            log.error("deliver error, message={} {}", message, e.getMessage());
                        }
                    } catch (InterruptedException e) {
                        log.error("deliver interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        void deliver(@Nullable Message message) {
            if (message != null) {
                try {
                    queue.put(message);
                } catch (Exception e) {
                    log.error("deliver error, message={} {}", message, e.getMessage());
                }
            }
        }

    }

}
